package vip.meeet.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;

public class SeekA {


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "joe");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

//        consumer.assign(Arrays.asList(partition0,partition1,partition2));//手动指派分区
//        consumer.subscribe(Arrays.asList("test"));

        consumer.subscribe(Collections.singletonList("test"), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("Assigned " + partitions);
                HashMap<TopicPartition, Long> map = new HashMap<>();
                map.put(new TopicPartition("test", 0), 200L);
                for (TopicPartition tp : partitions) {
                    OffsetAndMetadata oam = consumer.committed(tp);
                    if (oam != null) {
                        System.out.println("Current offset is " + oam.offset());
                    } else {
                        System.out.println("No committed offsets");
                    }
                    Long offset = map.get(tp);
                    if (offset != null) {
                        System.out.println("Seeking to " + offset);
                        consumer.seek(tp, offset);
                    }
                }
            }
        });

//        consumer.seek(partition0, 220);
//        consumer.seekToEnd(Arrays.asList(partition0));
        consumer.close();
    }
}
